Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improves/Fixes TableBackedProgressManager #10

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

hartmut-co-uk
Copy link
Contributor

Hi, I've been playing, testing and debugging with this lib + simple-printer example, also making use of TableBackedProgressManager + NewPeriodicProgressReporter in particular.

For some unexpected behaviour I came up with improvements/fixes I wanted to share and maybe discuss.
I've added my current findings + a new example into this PR.

Changes included

  • adds method Empty(..) to ChangeConsumer
    (allows for marking CDC log stream time as processed, fixes 'bug' where second start of a reader begins reading CDC log on generated_created timestamp instead of ChangeAgeLimit / actual latest processed state)
  • Simplify AdvancedReaderConfig options [PostEmptyQueryDelay, PostFailedQueryDelay] -> single 'PostQueryDelay' which is also considered when calculating next PollWindow
  • adds new example 'simple-printer-stateful' which correctly maintains the CDC reader's state using TableBackedProgressManager + NewPeriodicProgressReporter
  • adds some commented out 'logging' lines for debugging purposes

Options for further improvements:

  • gracefully shutdown the reader+consumers allowing to mark/save current progress before exit (~call consumer.End() / reporter.SaveAndStop(ctx context.Context))

Any questions, please reach out!

- adds method Empty(..) to ChangeConsumer
  (allows for marking CDC log stream time as processed, fixes 'bug' where second start of a reader begins reading CDC log on generated_created timestamp instead of ChangeAgeLimit / actual latest processed state)
- Simplify AdvancedReaderConfig options [PostEmptyQueryDelay, PostFailedQueryDelay] -> single 'PostQueryDelay' which is also considered when calculating next PollWindow
- adds new example 'simple-printer-stateful' which correctly maintains the CDC reader's state using TableBackedProgressManager + NewPeriodicProgressReporter
- adds some commented out 'logging' lines for debugging purposes
@hartmut-co-uk
Copy link
Contributor Author

cc: @avelanarius, @piodul

@hartmut-co-uk
Copy link
Contributor Author

cc: @haaawk

@hartmut-co-uk
Copy link
Contributor Author

hartmut-co-uk commented Dec 14, 2021

Latency (from 'write to table' ... 'Reader.Consume()') when the reader has fully caught up can be calculated as simple as
Latency = ConfidenceWindowSize + PostQueryDelay

e.g. for

Advanced: scyllacdc.AdvancedReaderConfig{
	ChangeAgeLimit:       15 * time.Minute,
	ConfidenceWindowSize: 10 * time.Second,
	PostQueryDelay:       5 * time.Second,
	PostFailedQueryDelay: 5 * time.Second,
	QueryTimeWindowSize:  5 * 60 * time.Second,
},

=> Latency: 15s
*with queries against scylla every 5s (PostQueryDelay) per [reader::table::vnodeId] ... -> with query windows of 5s
**also on initial startup or when behind + catching up QueryTimeWindowSize is used allowing for faster progress -> with query window size 5min (with above example).

Copy link
Collaborator

@piodul piodul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I apologize for leaving this PR unattended for so long.

The idea to introduce Empty() so that saving progress is possible when no rows are appearing on the stream looks good. I wasn't sure whether you are still interested in this PR, so I went ahead and used your idea in another PR: #13 (marking you as a co-author, of course). The most important change I made was moving Empty() from ChangeConsumer to a new, optional interface which extends ChangeConsumer, so that the changes are backwards-compatible.

I left review comments in this PR with some questions. I'm not sure whether you are still interested in the PR, so feel free to respond or disregard my comments.

@@ -203,14 +209,14 @@ func (sbr *streamBatchReader) getPollWindow() pollWindow {
if queryWindowRightEnd.Before(confidenceWindowStart) {
return pollWindow{
begin: windowStart,
end: gocql.MinTimeUUID(queryWindowRightEnd),
end: gocql.MinTimeUUID(queryWindowRightEnd.Add(sbr.config.Advanced.PostQueryDelay)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem correct. Due to the distributed nature of Scylla and CDC, rows with recent cdc$time may appear after some delay, so ConfidenceWindowSize is supposed to protect from that - we don't read rows with cdc$time newer than Now() - ConfidenceWindowSize. Here, it looks like you may break that assumption as queryWindowRightEnd + PostQueryDelay may be further than confidenceWindowStart.

Do you remember why you introduced that change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. after reading my code multiple times now - I think it was because the window is calculated first, then the delay is resolved and executed. I think this was very weird to understand and maybe some wider refactoring should be done?

see ref:

wnd = sbr.getPollWindow()
var delay time.Duration
if err != nil {
delay = sbr.config.Advanced.PostFailedQueryDelay
} else {
delay = sbr.config.Advanced.PostQueryDelay
}
delayUntil := windowProcessingStartTime.Add(delay)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was one of those details I only was able to discover while debugging with the added detailed logs. I remember this was confusing and time consuming. ^^

With this in place I was able to achieve and actually see the exact 15s expected latency, as I mentioned in my 4th comment.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. after reading my code multiple times now - I think it was because the window is calculated first, then the delay is resolved and executed. I think this was very weird to understand and maybe some wider refactoring should be done?

Right, now I see that the delay is wrong. The window is calculated first, then there is a sleep, then the loop goes to the next iteration and uses the query window calculated before the sleep. This is clearly wrong, the window should be used in a query immediately after being computed. The proper fix would be to move wnd = sbr.getPollWindow() at the end of the loop.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or - even better - wnd := sbr.getPollWindow() should be put at the beginning of the for loop and the other calls to getPollWindow() removed. I really don't know why it wasn't written like that in the beginning...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree that this code could use some refactoring :) Perhaps the logic responsible for tracking per-stream progress and calculating windows could be moved to an abstraction separate from the streamBatchReader.

Copy link
Contributor Author

@hartmut-co-uk hartmut-co-uk Oct 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, current state of the PR could be called a workaround at best 😅, glad I remembered why added this at least. Thanks for pointing out 🧐


touchesConfidenceWindow: false,
}
}
return pollWindow{
begin: windowStart,
end: gocql.MinTimeUUID(confidenceWindowStart),
end: gocql.MinTimeUUID(confidenceWindowStart.Add(sbr.config.Advanced.PostQueryDelay)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same issue as above - here, the end will be later than confidenceWindowStart.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(see above)

// a delay. This parameter specifies the length of the delay.
//
// If the parameter is left as 0, the library will automatically adjust
// the length of the delay.
PostEmptyQueryDelay time.Duration
PostQueryDelay time.Duration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this is a backwards-incompatible change and will require releasing a v2. I'd rather postpone it until more ideas for breaking changes accumulate and releasing v2 makes more sense.

Could you explain what is your motivation for unifying both parameters? The idea for having separate delays was that an empty query result was a signal that the library is polling too fast and wastes time on empty queries. Waiting longer for the next query should increase chances that the next query will have a non-empty result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I can't remember. I think I couldn't see any valid use case.
For my own CDC scenarios I was keen to get/keep my latency from record write time to CDC event as low as possible.

I agree it's not a good idea to introduce a breaking change for this.

But I think it also had something to do with the problem mentioned in regards to getPollWindow() executed before the delay is happening... (see further down)

@@ -100,6 +100,8 @@ outer:

if compareTimeuuid(wnd.begin, wnd.end) < 0 {
var iter *changeRowIterator
//sbr.config.Logger.Printf("queryRange: %s.%s :: %s (%s) [%s ... %s]",
// crq.keyspaceName, crq.tableName, crq.pkCondition, crq.bindArgs[0], wnd.begin.Time(), wnd.end.Time())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not leave commented out code in the repository. Perhaps the Logger interface could be extended so that it understand various verbosity level and this message could have low verbosity (debug/trace)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine for me either way - I mostly wanted to help understand how I've been debugging this - so I committed this along with for review purposes.

If you actually would like to ultimately merge this PR, let me know if I should drop..

@@ -71,6 +71,7 @@ func (ppr *PeriodicProgressReporter) Start(ctx context.Context) {
ppr.mu.Unlock()

// TODO: Log errors?
//ppr.logger.Printf("MarkProgress for %s: %s", ppr.reporter.streamID, timeToReport.Time())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue with logging here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, same as above

// Make sure you create the following table before you run this example:
// CREATE TABLE ks.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH cdc = {'enabled': 'true'};

func main() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what are the differences between simple-printer-stateful and complicated-consumer. Is there a good reason why you kept them separate? Perhaps the complicated-consumer could be improved in some way?

Copy link
Contributor Author

@hartmut-co-uk hartmut-co-uk Oct 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complicated-consumer

What do you mean by 'complicated-consumer'?

I think this simply was the example I've put together and was working with while debugging and attempting to fix the behaviour. So I've added it for reference.

Please feel free to keep, improve or drop. :-)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, "complicated-consumer" (which is actually called "complicated-printer") was an example that I apparently forgot to push. It's fine then, we can take your example instead.

@hartmut-co-uk
Copy link
Contributor Author

thx @piodul - good to see some movement on the CDC front...
I'm still interested to contribute / help with feedback, but I'm quite out of touch with this PR, so I'll need some focus time to review again myself... :-)
I'll try to make time soon.

@@ -203,14 +209,14 @@ func (sbr *streamBatchReader) getPollWindow() pollWindow {
if queryWindowRightEnd.Before(confidenceWindowStart) {
return pollWindow{
begin: windowStart,
end: gocql.MinTimeUUID(queryWindowRightEnd),
end: gocql.MinTimeUUID(queryWindowRightEnd.Add(sbr.config.Advanced.PostQueryDelay)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. after reading my code multiple times now - I think it was because the window is calculated first, then the delay is resolved and executed. I think this was very weird to understand and maybe some wider refactoring should be done?

see ref:

wnd = sbr.getPollWindow()
var delay time.Duration
if err != nil {
delay = sbr.config.Advanced.PostFailedQueryDelay
} else {
delay = sbr.config.Advanced.PostQueryDelay
}
delayUntil := windowProcessingStartTime.Add(delay)


touchesConfidenceWindow: false,
}
}
return pollWindow{
begin: windowStart,
end: gocql.MinTimeUUID(confidenceWindowStart),
end: gocql.MinTimeUUID(confidenceWindowStart.Add(sbr.config.Advanced.PostQueryDelay)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(see above)

// Make sure you create the following table before you run this example:
// CREATE TABLE ks.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH cdc = {'enabled': 'true'};

func main() {
Copy link
Contributor Author

@hartmut-co-uk hartmut-co-uk Oct 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complicated-consumer

What do you mean by 'complicated-consumer'?

I think this simply was the example I've put together and was working with while debugging and attempting to fix the behaviour. So I've added it for reference.

Please feel free to keep, improve or drop. :-)

@@ -236,6 +236,7 @@ func (tbpm *TableBackedProgressManager) SaveProgress(ctx context.Context, gen ti
tbpm.concurrentQueryLimiter.Acquire(ctx, 1)
defer tbpm.concurrentQueryLimiter.Release(1)

//log.Printf("SaveProgress for %s = %s\n", streamID, progress.LastProcessedRecordTime)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: here's one more unused log

// a delay. This parameter specifies the length of the delay.
//
// If the parameter is left as 0, the library will automatically adjust
// the length of the delay.
PostEmptyQueryDelay time.Duration
PostQueryDelay time.Duration
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I can't remember. I think I couldn't see any valid use case.
For my own CDC scenarios I was keen to get/keep my latency from record write time to CDC event as low as possible.

I agree it's not a good idea to introduce a breaking change for this.

But I think it also had something to do with the problem mentioned in regards to getPollWindow() executed before the delay is happening... (see further down)

@@ -100,6 +100,8 @@ outer:

if compareTimeuuid(wnd.begin, wnd.end) < 0 {
var iter *changeRowIterator
//sbr.config.Logger.Printf("queryRange: %s.%s :: %s (%s) [%s ... %s]",
// crq.keyspaceName, crq.tableName, crq.pkCondition, crq.bindArgs[0], wnd.begin.Time(), wnd.end.Time())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine for me either way - I mostly wanted to help understand how I've been debugging this - so I committed this along with for review purposes.

If you actually would like to ultimately merge this PR, let me know if I should drop..

@@ -71,6 +71,7 @@ func (ppr *PeriodicProgressReporter) Start(ctx context.Context) {
ppr.mu.Unlock()

// TODO: Log errors?
//ppr.logger.Printf("MarkProgress for %s: %s", ppr.reporter.streamID, timeToReport.Time())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, same as above

@hartmut-co-uk
Copy link
Contributor Author

@piodul shall I close this PR?
Or would you want to reuse anything else or some of the suggestions?
e.g.

Options for further improvements:

  • gracefully shutdown the reader+consumers allowing to mark/save current progress before exit (~call consumer.End() / reporter.SaveAndStop(ctx context.Context))

@dkropachev
Copy link
Collaborator

@hartmut-co-uk , thanks for your contribution, this project recently have changed it's maintainers, can you please let me know if you are willing to continue work on this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants